In [ ]:
from IPython.display import Image, SVG
id: Unique identifier of the customer
name: Name of the customer
transactions: List of transaction-id, amount pairs, one for each transaction for the customer in that file
In [ ]:
from accounts import create_accounts_json
num_files = 25
n = 100000 # number of accounts per file
k = 500 # number of transactions
create_accounts_json(num_files, n, k)
In [ ]:
from nfs import create_denormalized
create_denormalize()
In [ ]:
from random_array import random_array
random_array()
Dask is a flexible parallel computing library for analytics. Dask emphasizes the following virtues:
dask.distributed
In [ ]:
Image("http://dask.pydata.org/en/latest/_images/collections-schedulers.png")
In [ ]:
SVG("http://dask.pydata.org/en/latest/_images/dask-array-black-text.svg")
+, *, exp, log, ...sum(), mean(), std(), sum(axis=0), ...tensordottransposex[:100, 500:100:-2]x[:, [10, 1, 5]]__array__svd, qr, solve, solve_triangular, lstsq
In [ ]:
import dask.array as da
chunk is important and has performance implications
In [ ]:
x = da.arange(25, chunks=5)
In [ ]:
y = x ** 2
In [ ]:
y
In [ ]:
y.visualize()
In [ ]:
y.dask.keys()
compute
In [ ]:
y.compute()
__array__ protocol
In [ ]:
np.array(y)
computedask.get is an alias for the synchronous backend. Useful for debugging.
In [ ]:
y.compute(get=dask.get)
dask.threaded.get is the default
In [ ]:
y.compute(get=dask.threaded.get)
In [ ]:
from multiprocessing import cpu_count
cpu_count()
forkfork creates a new child process which is a copy(-on-write) of the parent process
In [ ]:
y.compute(get=dask.multiprocessing.get)
dask.distributed library
In [ ]:
import h5py
import os
f = h5py.File(os.path.join('..', 'data', 'random.hdf5'))
dset = f['/x']
In [ ]:
sums = []
for i in range(0, 1000000000, 1000000):
chunk = dset[i: i + 1000000]
sums.append(chunk.sum())
total = np.sum(sums)
print(total / 1e9)
In [ ]:
x = da.from_array(dset, chunks=(1000000, ))
x
In [ ]:
result = x.mean()
In [ ]:
result
In [ ]:
result.compute()
In [ ]:
x[:10].compute()
Use dask.array.random.normal to create a 20,000 x 20,000 array $X ~ \sim N(10, .1)$ with chunks set to (1000, 1000)
Take the mean of every 100 elements along axis 0.
Hint: Recall you can slice with the following syntax [start:end:step]
In [ ]:
# [Solution here]
In [ ]:
%load solutions/dask_array.py
Your performance may vary. If you attempt the NumPy version then please ensure that you have more than 4GB of main memory.
In [ ]:
import numpy as np
In [ ]:
%%time
x = np.random.normal(10, 0.1, size=(20000, 20000))
y = x.mean(axis=0)[::100]
y
Faster and needs only MB of memory
In [ ]:
%%time
x = da.random.normal(10, 0.1, size=(20000, 20000), chunks=(1000, 1000))
y = x.mean(axis=0)[::100]
y.compute()
da.linalg.qrda.linalg.choleskyda.linalg.svdmap, filter, fold, frequencies and groupby
In [ ]:
import os
import dask.bag as db
In [ ]:
bag = db.read_text(os.path.join('..', 'data', 'accounts.*.json.gz'))
In [ ]:
bag.take(3)
In [ ]:
import json
In [ ]:
js = bag.map(json.loads)
In [ ]:
js.take(3)
In [ ]:
counts = js.pluck('name').frequencies()
In [ ]:
counts.compute()
filter and take all of the transactions for the first five users named "Alice"count_transactions that takes a dictionary from accounts and returns a dictionary that holds the name and a key count that is the number of transactions for that user id.filter to get the accounts where the user is named Alice and map the function you just created to get the number of transactions for each user named Alice. pluck the count and display the first 5.
In [ ]:
%load solutions/bag_alice.py
In [ ]:
b = db.from_sequence(['Alice', 'Bob', 'Charlie', 'Dan', 'Edith', 'Frank'])
b.groupby(len).compute()
In [ ]:
b = db.from_sequence(list(range(10)))
b.groupby(lambda x: x % 2).compute()
Group by eevens and odds and take the largest value
In [ ]:
b.groupby(lambda x: x % 2).map(lambda k, v: (k, max(v))).compute()
combineByKey method on RDDWhen using foldby you provide
Your reduction must be associative. It will happen in parallel in each of the partitions of your dataset. Then all of these intermediate results will be combined by the combine binary operator.
This is just what we saw in sum above
functools.reduce works like so
In [ ]:
import functools
In [ ]:
values = range(10)
In [ ]:
def func(acc, y):
print(acc)
print(y)
print()
return acc + y
In [ ]:
functools.reduce(func, values)
In [ ]:
b.foldby(lambda x: x % 2, binop=max, combine=max).compute()
Using the accounts data above, find the number of people with the same name
In [ ]:
js.take(1)
In [ ]:
from dask.diagnostics import ProgressBar
In [ ]:
counts = js.foldby(key='name',
binop=lambda total, x: total + 1,
initial=0,
combine=lambda a, b: a + b,
combine_initial=0)
In [ ]:
with ProgressBar():
result = counts.compute()
In [ ]:
result
In [ ]:
%load solutions/bag_foldby.py
<img src="http://dask.pydata.org/en/latest/_images/dask-dataframe.svg", width="30%">
Trivially parallelizable operations (fast):
df.x + df.y, df * dfdf[df.x > 0]df.loc[4.0:10.5]df.x.max(), df.max()df[df.x.isin([1, 2, 3])]df.timestamp.monthCleverly parallelizable operations (fast):
df.groupby(df.x).y.max(), df.groupby('x').max()df.x.value_counts()df.x.drop_duplicates()dd.merge(df1, df2, left_index=True, right_index=True)dd.merge(df1, df2, on='id')df1.x + df2.ydf.resample(...)df.rolling(...)df[['col1', 'col2']].corr()Operations requiring a shuffle (slow-ish, unless on index)
df.set_index(df.x)df.groupby(df.x).apply(myfunc)dd.merge(df1, df2, on='name')
In [ ]:
import dask.dataframe as dd
In [ ]:
df = dd.read_csv("../data/NationalFoodSurvey/NFS*.csv")
DataFrame.head is one operation that is not lazy
In [ ]:
df.head(5)
In [ ]:
df.npartitions
In [ ]:
df.known_divisions
styr to make some operations more performantSo
[1974, 1975, 1976]
Would be 2 partitions. The first contains 1974. The second contains 1975 and 1976. To get three partitions, one for the final observation, duplicate it.
[1974, 1975, 1976, 1976]
In [ ]:
partitions = list(range(1974, 2001)) + [2000]
df = df.set_partition('styr', divisions=partitions)
In [ ]:
df.known_divisions
In [ ]:
df.divisions
In [ ]:
df.info()
In addition to the (supported) pandas DataFrame API, dask provides a few more convenient methods
DataFrame.categorizeDataFrame.map_partionsDataFrame.get_divisionDataFrame.repartitionDataFrame.set_partitionDataFrame.to_{bag|castra}DataFrame.visualizeA few methods have a slightly different API
DataFrame.applyGroupBy.apply
In [ ]:
df2000 = df.get_division(26)
In [ ]:
type(df2000)
What food group was consumed the most times in 2000?
In [ ]:
df2000.set_index('minfd')
In [ ]:
grp = df2000.groupby('minfd')
In [ ]:
size = grp.apply(len, columns='size')
In [ ]:
size.head()
In [ ]:
minfd = size.compute().idxmax()
In [ ]:
print(minfd)
In [ ]:
food_mapping = pd.read_csv("../data/NationalFoodSurvey/food_mapping.csv")
isin method
In [ ]:
food_mapping.ix[food_mapping.minfd.isin([minfd])]
In [ ]:
# [Solution here]
In [ ]:
%load solutions/nfs_most_purchased.py
In [ ]:
def most_frequent_food(partition):
# partition is a pandas.DataFrame
grpr = partition.groupby('minfd')
size = grpr.size()
minfd = size.idxmax()
idx = food_mapping.minfd.isin([minfd])
description = food_mapping.ix[idx].minfddesc.iloc[0]
year = int(partition.styr.iloc[0])
return year, description
In [ ]:
mnfd_year = df.map_partitions(most_frequent_food)
In [ ]:
mnfd_year.compute()
In [ ]:
zip(mnfd_year.compute(),)
minfd and calculate daily per capita consumption of each food group. Hint, you want to use map_partitions.
In [ ]:
%load solutions/average_consumption.py
In [ ]:
Image('images/bcolz_bench.png')